# 01 Python并发的上下文管理 梳理在Python的各种并发场景,各种隔离级别的数据定义方式,当前重点关注Python内存中的定义和使用要点; ## 1.1 线程数据隔离threading.local() 代码的运行都是有上下文环境的,即运行内存空间中的局部变量是什么,全局变量是什么. 一般情况下,全局变量的名称空间(比如模块中定义的变量)对于不同的线程,都是统一的,每一个线程都能去读,写它. 当处于并发的情况下,如果想要不改变变量位于全局代码位置的封装形式,又想让变量在各个线程中都是独立的, 这时候就需要要使用thread.local(). ```python import threading context = threading.local() context.value = 0 # 初始值 context.name = "global" def func1(): print("in func1:") print("context.value is {}, context.name is {}".format(context.value, context.name)) def func2(): print("in func2:") context.value = 2 # 子线程内部初始化 context.name = "func2" print("context.value is {}, context.name is {}".format(context.value, context.name)) def main(): print("in main first: context.value is {}, context.name is {}".format( context.value, context.name)) t = threading.Thread(target=func1) t.start() t.join() t = threading.Thread(target=func2) t.start() t.join() print("in main second: context.value is {}, context.name is {}".format( context.value, context.name)) if __name__ == "__main__": main() ``` 运行代码输出如下结果: ``` in main first: context.value is 0, context.name is global in func1: Exception in thread Thread-1 (func1): Traceback (most recent call last): File "/usr/lib/python3.10/threading.py", line 1016, in _bootstrap_inner self.run() File "/usr/lib/python3.10/threading.py", line 953, in run self._target(*self._args, **self._kwargs) File "/root/workspace/writedocs/source/01python/demo.py", line 12, in func1 print("context.value is {}, context.name is {}".format(context.value, context.name)) AttributeError: '_thread._local' object has no attribute 'value' in func2: context.value is 2, context.name is func2 in main second: context.value is 0, context.name is global ``` > **Note:** > > 当在func1中,子线程没有设置值,所有抛出了异常; > 而在func2中,子线程设置了值,但是设置的值并没有影响外面主线程的变量值; > 我们可以得出如下结论: > threading.local()的属性在每一个线程都是独立,并且都需要独立初始化. > 每个线程在修改属性变量时,不会影响其他线程的数据.这样就达到了数据在不同线程独立的目的. > > **每个线程的数据都是独立的,需要在线程启动的时候初始化,否则在读取的时候就会报错;** ## 1.2 基于ContextVar的上下文数据隔离 ### 1.2.1 要点整理 + contextvars.Context(上下文)是 ContextVar 及其当前值的映射容器。 + 每个线程对应一个 PyThreadState, PyThreadState 中维护当前激活的 Context(current_context)。 + asyncio.Task 保存一个独立的 Context, 该 Context 在 Task 创建时通过 copy_context() 捕获。 当 Task 被调度运行时, 其保存的 Context 会被设置为线程当前激活的 Context。 + copy_context() 的时间复杂度为 O(1)。 + 它不会遍历复制所有 ContextVar。 + 新旧 Context 会共享底层状态结构, 因此复制成本与 ContextVar 数量无关。 + 当执行 ContextVar.set() 时, 当前 Context 会产生新的状态版本, 修改仅对当前 Context 可见, 不会影响其他共享历史状态的 Context。 + context.run(func, *args, **kwargs) 的本质是在当前线程中临时切换 current_context。 + 执行前保存旧 Context。 + 将 current_context 设置为指定 Context。 + 执行 func。 + func 退出后恢复原 Context。 + run() 不会自动执行 copy_context(), 也不会创建新的 Context, 而是直接在传入的 Context 上运行代码。 + 同一个 Context 不允许被并发进入。 如果 context_a.run(...) 尚未退出, 另一个线程(或执行流)再次进入同一个 Context, 将抛出 RuntimeError。 ### 1.2.2 验证示例 ```python import asyncio import contextvars import threading import time import sys import gc # 1. 定义一个用于追踪内存死活的测试对象 class MemoryTracker: def __init__(self, name): self.name = name print(f"[创建对象]:{self.name}") def __del__(self): print(f"[释放对象]:{self.name}") ctx_var = contextvars.ContextVar("my_tracker") def thread_worker(thread_name, obj_name): print(f"\n[线程 {thread_name}] 启动...") # 在当前线程的 Context 树中写入变量 tracker = MemoryTracker(obj_name) ctx_var.set(tracker) print(f"[线程 {thread_name}] 读取当前内存地址: {hex(id(ctx_var.get()))}") time.sleep(1.5) # 故意阻塞,等待另一个线程干扰 # 再次读取,验证是否被污染 print(f"[线程 {thread_name}] 阻塞结束后再次读取: {ctx_var.get().name}") # --- 协程工作函数 --- async def async_worker(task_name, obj_name): print(f"\n[协程 {task_name}] 启动...") # 协程启动时自动隐式触发 copy_context() 软拷贝 tracker = MemoryTracker(obj_name) ctx_var.set(tracker) print(f"[协程 {task_name}] 读取当前内存地址: {hex(id(ctx_var.get()))}") # 遇到 await 挂起,此时事件循环会切走线程的 current_context 指针 await asyncio.sleep(1) # 唤醒后再次读取,验证指针切回来后数据是否完好 print(f"[协程 {task_name}] 唤醒后再次读取: {ctx_var.get().name}") def run_isolation_test(): # A. 启动两个线程(物理隔离) t1 = threading.Thread(target=thread_worker, args=("A", "Thread-Data-A")) t2 = threading.Thread(target=thread_worker, args=("B", "Thread-Data-B")) t1.start() t2.start() t1.join() t2.join() # B. 在单线程内启动两个协程(指针频繁闪现切换) async def main_async(): await asyncio.gather( async_worker("1", "Coroutine-Data-1"), async_worker("2", "Coroutine-Data-2") ) asyncio.run(main_async()) def run_lifecycle_test(): # 创建一个局部的旧上下文 old_ctx = contextvars.copy_context() def scope_func(): # 在这个临时环境中存入对象 local_tracker = MemoryTracker("Lifecycle-Target") ctx_var.set(local_tracker) print(f" [当前状态] 对象在临时 Context 中的引用计数: {sys.getrefcount(local_tracker) - 1}") print("\n[步骤 1] 通过 ctx.run 进入临时上下文并赋值:") old_ctx.run(scope_func) print("\n[步骤 2] 退出 ctx.run,回到外层上下文。") print(" 此时线程指针已经切走,原临时 Context 失去强引用。") print("\n[步骤 3] 手动触发垃圾回收(GC)...") del old_ctx # 销毁上下文容器 gc.collect() # 强制触发 Python 内存回收 print("\n[测试结束] 如果上面没有打印 [释放对象],说明发生了内存泄漏。") if __name__ == "__main__": run_isolation_test() run_lifecycle_test() ```